# Author: Stephen Situ
# Apache Kafka is an event streaming platform used for building real time data pipelines and streaming applications. Using a Avien
# Kafka service and the kafka-python library, we create a streaming data pipeline. Kafka uses the concepts of producers and consumers,
# with brokers acting as the middleman. The broker has topics, and within the topics themselves, there can be different partitions for the data.
# Using the key, we can send messages to different partitions and from the consumer side, the key will ensure they will receive messages
# from the right partition. In addition, it is also possible to use consumer groups to manage consumers.
# Kafka connect can be used to integrate with existing data workflows without needing to modify code.
# Create a kafka producer with the correct credentials
# Serializers are set to use the json module to convery keys and values to json strings
# and encode them as ascii before sending to the Kafka topic. This is needed for key and values
# to be understood by a Kafka topic
import time
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=f"kafka-4da1624-wssitu-05f0.aivencloud.com:19068",
security_protocol="SSL",
ssl_cafile="ca.pem",
ssl_certfile="service.cert",
ssl_keyfile="service.key",
value_serializer=lambda v: json.dumps(v).encode('ascii'),
key_serializer=lambda v: json.dumps(v).encode('ascii')
)
topic_name="Pizza"
# We use the send method to send a message to a Kafka topic
# Flush method forces any buffered message to be send to the Kafka broker.
producer.send(
topic_name,
key={"id":1},
value={"name":"John", "pizza":"Cheese"}
)
producer.flush()
producer.send(
topic_name,
key={"id":2},
value={"name":"Steve", "pizza":"Pepperoni"}
)
producer.flush()
producer.send(
topic_name,
key={"id":3},
value={"name":"Kate", "pizza":"Steak"}
)
producer.flush()
producer.send(
topic_name,
key={"id":4},
value={"name":"Jason", "pizza":"Chicken"}
)
producer.flush()
producer.send(
topic_name,
key={"id":5},
value={"name":"Kevin", "pizza":"Donaid"}
)
producer.flush()